package example

/**
  * Created by zhangyaran on 2017/10/30.
  */

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.mllib.recommendation._
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.Map
import scala.collection.mutable.ArrayBuffer
import scala.util.Random


object RunRecommender {

  def main(args: Array[String]): Unit = {
    // 实例化SparkContext
    val sc = new SparkContext(new SparkConf().setAppName("Recommender"))

    // 数据存放路径
    val base = "hdfs:///user/ds/"
    // 用户ID-艺术家ID-值 数据文件
    val rawUserArtistData = sc.textFile(base + "user_artist_data.txt")
    // 艺术家ID-姓名
    val rawArtistData = sc.textFile(base + "artist_data.txt")
    //  拼写错误的艺术家ID或非标准的艺术家ID-艺术家的正规ID
    val rawArtistAlias = sc.textFile(base + "artist_alias.txt")


    // 对三个文件的数据进行预处理
    preparation(rawUserArtistData, rawArtistData, rawArtistAlias)

    // 建立模型
    model(sc, rawUserArtistData, rawArtistData, rawArtistAlias)

    // 评价模型
    evaluate(sc, rawUserArtistData, rawArtistAlias)
    recommend(sc, rawUserArtistData, rawArtistData, rawArtistAlias)
  }


  def buildArtistByID(rawArtistData: RDD[String]) =
    rawArtistData.flatMap { line =>
      val (id, name) = line.span(_ != '\t')
      if (name.isEmpty) {
        None
      } else {
        try {
          Some((id.toInt, name.trim))
        } catch {
          case e: NumberFormatException => None
        }
      }
    }


  def buildArtistAlias(rawArtistAlias: RDD[String]): scala.collection.Map[Int, Int] =
    rawArtistAlias.flatMap { line =>
      val tokens = line.split('\t')
      if (tokens(0).isEmpty) {
        None
      } else {
        Some((tokens(0).toInt, tokens(1).toInt))
      }
    }.collectAsMap()


  // 准备数据
  def preparation(
                   rawUserArtistData: RDD[String],
                   rawArtistData: RDD[String],
                   rawArtistAlias: RDD[String]) = {
    // 对用户ID进行统计分析
    val userIDStats = rawUserArtistData.map(_.split(' ')(0).toDouble).stats()
    // 对艺术家ID进行统计分析
    val itemIDStats = rawUserArtistData.map(_.split(' ')(1).toDouble).stats()

    // 因为Spark MLlib的ALS算法要求用户和产品的ID必须是数值型，并且是32位非负整数。这意味着大于Integer.MAX_VALUE(2147483647)的ID都是非法的。
    println(userIDStats)
    println(itemIDStats)


    // org.apache.spark.rdd.RDD[(Int, String)] 艺术家ID-姓名
    val artistByID = buildArtistByID(rawArtistData)
    // scala.collection.Map[Int,Int] Map(6803336 -> 1000010, 6663187 -> 1992, 2124273 -> 2814,...)
    val artistAlias = buildArtistAlias(rawArtistAlias)


    // 列出artistAlias第一个key-value值(badID: Int = 6803336 goodID: Int = 1000010)
    val (badID, goodID) = artistAlias.head
    // 比如WrappedArray(Aerosmith (unplugged)) -> WrappedArray(Aerosmith)
    println(artistByID.lookup(badID) + " -> " + artistByID.lookup(goodID))
  }


  // 构建排名模型
  def buildRatings(
                    rawUserArtistData: RDD[String],
                    bArtistAlias: Broadcast[Map[Int, Int]]) = {
    rawUserArtistData.map { line =>
      val Array(userID, artistID, count) = line.split(' ').map(_.toInt)
      // 根据badID获取goodID，如果根据badID没有对应的goodID，就默认为badID
      val finalArtistID = bArtistAlias.value.getOrElse(artistID, artistID)
      // 构建Rating模型(org.apache.spark.mllib.recommendation._)
      Rating(userID, finalArtistID, count)
    }
  }


  // 建立模型
  def model(
             sc: SparkContext,
             rawUserArtistData: RDD[String],
             rawArtistData: RDD[String],
             rawArtistAlias: RDD[String]): Unit = {


    // 对badID->goodID数据进行广播(org.apache.spark.broadcast.Broadcast[scala.collection.Map[Int,Int]])
    val bArtistAlias = sc.broadcast(buildArtistAlias(rawArtistAlias))


    // 构建排名模型(trainData: org.apache.spark.rdd.RDD[org.apache.spark.mllib.recommendation.Rating])
    val trainData = buildRatings(rawUserArtistData, bArtistAlias).cache()


    // 开始训练模型(model: org.apache.spark.mllib.recommendation.MatrixFactorizationModel)
    val model = ALS.trainImplicit(trainData, 10, 5, 0.01, 1.0)


    // 对训练的数据强制移除缓存/持久化的RDD
    trainData.unpersist()
    // (90,-0.34101608395576477, 0.9143036007881165, 0.8365337252616882, 0.48369652032852173, 0.5416117906570435, -0.40115535259246826, 0.2735286056995392, 0.17051056027412415, -0.4300401508808136, -0.0985608622431755)
    println(model.userFeatures.mapValues(_.mkString(", ")).first())


    // 利用计算出的模型给2093760用户推荐5名艺术家
    val userID = 2093760
    // recommendations: Array[org.apache.spark.mllib.recommendation.Rating] = Array(Rating(2093760,1300642,0.03101707128027848),..)
    val recommendations = model.recommendProducts(userID, 5)
    // Rating(2093760,1300642,0.03101707128027848)
    // Rating(2093760,2814,0.030879657724463577)
    // Rating(2093760,1001819,0.03010725268263153)
    // Rating(2093760,1037970,0.02981504815221999)
    // Rating(2093760,1007614,0.029730034526127574)
    recommendations.foreach(println)
    //  列出所有推荐的艺术家(去除重复的艺术家)
    val recommendedProductIDs = recommendations.map(_.product).toSet


    // 获取所有userID为2093760参与的评价数据(用户ID-艺术家ID-值)(rawArtistsForUser: org.apache.spark.rdd.RDD[Array[String]])
    val rawArtistsForUser = rawUserArtistData.map(_.split(' ')).filter { case Array(user, _, _) => user.toInt == userID }
    // 获取用户ID对应的所有艺术家ID列表(existingProducts: scala.collection.immutable.Set[Int] = Set(1255340, 942, 1180, 813, 378))
    val existingProducts = rawArtistsForUser.map { case Array(_, artist, _) => artist.toInt }.collect().toSet


    // artistByID: org.apache.spark.rdd.RDD[(Int, String)]
    val artistByID = buildArtistByID(rawArtistData)


    // 根据艺术家ID获取艺术家姓名
    // David Gray
    // Blackalicious
    // Jurassic 5
    // The Saw Doctors
    // Xzibit
    artistByID.filter { case (id, name) => existingProducts.contains(id) }.values.collect().foreach(println)

    // 根据艺术家ID获取推荐的艺术家姓名
    // 50 Cent
    // Jay-Z
    // Kanye West
    // 2Pac
    // The Game
    artistByID.filter { case (id, name) => recommendedProductIDs.contains(id) }.values.collect().foreach(println)


    unpersist(model)
  }


  // areaUnderCurve()把一个函数作为它的第三个参数。这里传入的是 MatrixFactorizationModel 的 predict()
  def areaUnderCurve(
                      positiveData: RDD[Rating], // 校验集
                      bAllItemIDs: Broadcast[Array[Int]], // 所有去重后的艺术家ID
                      predictFunction: (RDD[(Int, Int)] => RDD[Rating])) = {
    // What this actually computes is AUC, per user. The result is actually something
    // that might be called "mean AUC".


    // Take held-out data as the "positive", and map to tuples
    val positiveUserProducts = positiveData.map(r => (r.user, r.product))
    // Make predictions for each of them, including a numeric score, and gather by user
    val positivePredictions = predictFunction(positiveUserProducts).groupBy(_.user)


    // BinaryClassificationMetrics.areaUnderROC is not used here since there are really lots of
    // small AUC problems, and it would be inefficient, when a direct computation is available.


    // Create a set of "negative" products for each user. These are randomly chosen
    // from among all of the other items, excluding those that are "positive" for the user.
    val negativeUserProducts = positiveUserProducts.groupByKey().mapPartitions {
      // mapPartitions operates on many (user,positive-items) pairs at once
      userIDAndPosItemIDs => {
        // Init an RNG and the item IDs set once for partition
        val random = new Random()
        val allItemIDs = bAllItemIDs.value
        userIDAndPosItemIDs.map { case (userID, posItemIDs) =>
          val posItemIDSet = posItemIDs.toSet
          val negative = new ArrayBuffer[Int]()
          var i = 0
          // Keep about as many negative examples per user as positive.
          // Duplicates are OK
          while (i < allItemIDs.size && negative.size < posItemIDSet.size) {
            val itemID = allItemIDs(random.nextInt(allItemIDs.size))
            if (!posItemIDSet.contains(itemID)) {
              negative += itemID
            }
            i += 1
          }
          // Result is a collection of (user,negative-item) tuples
          negative.map(itemID => (userID, itemID))
        }
      }
    }.flatMap(t => t)
    // flatMap breaks the collections above down into one big set of tuples


    // Make predictions on the rest:
    val negativePredictions = predictFunction(negativeUserProducts).groupBy(_.user)


    // Join positive and negative by user
    positivePredictions.join(negativePredictions).values.map {
      case (positiveRatings, negativeRatings) =>
        // AUC may be viewed as the probability that a random positive item scores
        // higher than a random negative one. Here the proportion of all positive-negative
        // pairs that are correctly ranked is computed. The result is equal to the AUC metric.
        var correct = 0L
        var total = 0L
        // For each pairing,
        for (positive <- positiveRatings;
             negative <- negativeRatings) {
          // Count the correctly-ranked pairs
          if (positive.rating > negative.rating) {
            correct += 1
          }
          total += 1
        }
        // Return AUC: fraction of pairs ranked correctly
        correct.toDouble / total
    }.mean() // Return mean AUC over users
  }


  // (sc: org.apache.spark.SparkContext, train: org.apache.spark.rdd.RDD[org.apache.spark.mllib.recommendation.Rating])(allData: org.apache.spark.rdd.RDD[(Int, Int)])
  // 向每个用户推荐播放最多的艺术家。这个策略一点儿都不个性化，但它很简单，也可能有效。定义这个简单模型并评估它的AUC得分。
  // 这个函数看似有两个参数列表。调用函数并应用前两个参数得到了一个偏应用函数，这个函数本身又带有一个参数(allData)并返回预测结果。
  // predictMostListened(sc,trainData)的返回结果是一个函数
  def predictMostListened(sc: SparkContext, train: RDD[Rating])(allData: RDD[(Int, Int)]) = {
    // 先对训练集中的艺术家ID的值进行汇总，并广播给所有Executor(bListenCount: org.apache.spark.broadcast.Broadcast[scala.collection.Map[Int,Double]])
    //bListenCount.value(Map(6867269 -> 4.0, 10268079 -> 9.0, 1249474 -> 38.0,...)
    val bListenCount = sc.broadcast(train.map(r => (r.product, r.rating)).reduceByKey(_ + _).collectAsMap())
    //
    allData.map { case (user, product) => Rating(user, product, bListenCount.value.getOrElse(product, 0.0)) }
  }


  def evaluate(
                sc: SparkContext,
                rawUserArtistData: RDD[String],
                rawArtistAlias: RDD[String]): Unit = {
    // 对badID->goodID数据进行广播(org.apache.spark.broadcast.Broadcast[scala.collection.Map[Int,Int]])
    // bArtistAlias.value
    val bArtistAlias = sc.broadcast(buildArtistAlias(rawArtistAlias))


    // 所有数据构建排名模型(allData: org.apache.spark.rdd.RDD[org.apache.spark.mllib.recommendation.Rating])
    val allData = buildRatings(rawUserArtistData, bArtistAlias)


    // 90%用于训练模型，10%用作检验集(def randomSplit(weights: Array[Double], seed: Long): Array[org.apache.spark.rdd.RDD[T]])
    val Array(trainData, cvData) = allData.randomSplit(Array(0.9, 0.1))

    // 将训练集和检验集都缓存在内存
    trainData.cache()
    cvData.cache()


    // 获取所有去重后的艺术家ID，收集给驱动程序(Driver Program)
    val allItemIDs = allData.map(_.product).distinct().collect()

    // 将所有艺术家ID广播到所有Executor
    val bAllItemIDs = sc.broadcast(allItemIDs)


    // mostListenedAUC: Double = 0.9393479200578481
    // 这个是非个性化推荐，得分大于0.94，意味着对AUC这个指标，表现已经不错了，明显打败了之前的简单推荐方法。
    val mostListenedAUC = areaUnderCurve(cvData, bAllItemIDs, predictMostListened(sc, trainData))
    println(mostListenedAUC)


    // 下面选择更好的推荐方法
    /*
      我们可以从数据集中选择另外的90%作为训练集，这样就可以多次进行模型评估。得到的AUC值的平均可能会更好地估计算法在数据集上的表现。
      实际中一个常用的做法就是把数据集分成K个大小差不多的子集，用k-1个子集做训练，在剩下的一个子集上做评估。我们把这个过程重复k次，每次
      用一个不同的子集做评估。这种做法称为k折交叉验证算法。MLlib的辅助方法MLUtils.kFold()方法在一定程度上提供对这种技术的支持。

      选择超参数，这些都是调用者指定的，不是算法学习得到的。
      ALS.trainImplicit()的参数包括如下几个：
      * rank = 10 模型的潜在因素的个数，即"用户-特征"和"产品-特征"矩阵的列数；一般来说，它也是矩阵的阶
      * iterations = 5 矩阵分解迭代的次数；迭代的次数越多，花费的时间越长，但分解的结果可能会更好
      * lambda = 0.01 标准的过拟合参数；值越大越不容易产生过拟合，但值太大会减低分解的准确度
      * alpha = 1.0 控制矩阵分解时，被观察到的"用户-产品"交互相对没被观察到的交互的权重

      可以把rank，lambda和alpha看做是模型的超参数。(iterations更像是对分解过程使用的资源的一种约束)。这些值不会体现在 MatrixFactorizationModel 的内容矩阵中，
      这些矩阵只是参数，其值由算法选定。这里的参数不一定是最优的。
      我们下面尝试了8种可能的组合，rank=10或50，lambda=10或0.0001，以及alpha=1.0或40.0。这些值都是猜测的，但他们能够覆盖很大范围的参数值。
      各种组合的的结果按照AUC得分从高到低排序。

      我们算出的结果中，lambda取较大的值看起来结果要稍微好一些。这表明模型有些受到拟合的影响，因此需要一个较大的lambda值以防止过度精确拟合每个用户
      的稀疏输入数据。

      严格来说，理解超参数的含义其实不是必须的，但知道这些值的典型范围有助于一个合适的参数空间开始搜索，这个空间不宜太大，也不能太小。

    */
    val evaluations =
    for (rank <- Array(10, 50);
         lambda <- Array(1.0, 0.0001);
         alpha <- Array(1.0, 40.0))
      yield {
        val model = ALS.trainImplicit(trainData, rank, 10, lambda, alpha)
        val auc = areaUnderCurve(cvData, bAllItemIDs, model.predict)
        unpersist(model)
        ((rank, lambda, alpha), auc)
      }


    evaluations.sortBy(_._2).reverse.foreach(println)


    trainData.unpersist()
    cvData.unpersist()
  }


  def recommend(
                 sc: SparkContext,
                 rawUserArtistData: RDD[String],
                 rawArtistData: RDD[String],
                 rawArtistAlias: RDD[String]): Unit = {


    val bArtistAlias = sc.broadcast(buildArtistAlias(rawArtistAlias))
    val allData = buildRatings(rawUserArtistData, bArtistAlias).cache()
    val model = ALS.trainImplicit(allData, 50, 10, 1.0, 40.0)
    allData.unpersist()


    val userID = 2093760
    val recommendations = model.recommendProducts(userID, 5)
    val recommendedProductIDs = recommendations.map(_.product).toSet


    val artistByID = buildArtistByID(rawArtistData)


    artistByID.filter { case (id, name) => recommendedProductIDs.contains(id) }.
      values.collect().foreach(println)


    val someUsers = allData.map(_.user).distinct().take(100)
    val someRecommendations = someUsers.map(userID => model.recommendProducts(userID, 5))
    someRecommendations.map(
      recs => recs.head.user + " -> " + recs.map(_.product).mkString(", ")
    ).foreach(println)


    unpersist(model)
  }


  def unpersist(model: MatrixFactorizationModel): Unit = {
    // At the moment, it\'s necessary to manually unpersist the RDDs inside the model
    // when done with it in order to make sure they are promptly uncached
    model.userFeatures.unpersist()
    model.productFeatures.unpersist()
  }


}